Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-2728] Extension for sketch-based statistics #3686

Closed
wants to merge 3 commits into from

Conversation

arnaudfnr
Copy link
Contributor

@arnaudfnr arnaudfnr commented Aug 4, 2017

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Each commit in the pull request should have a meaningful subject line and body.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Run mvn clean verify to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

Extension to compute approximate statistics with the use of probabilistic data structure, or sketches.

For now, 4 sketches are supported :

  • HyperLogLog (ApproximateDistinct) for stream's cardinality
  • Count-min Sketch (SketchFrequencies) for computing frequency by element
  • Stream Summary (KMostFrequent) for computing a top k of most frequent elements
  • T-Digest (TDigestQuantiles) for computing quantiles in the stream

The sketches are implemented as Beam Combiners, allowing a user to build the sketch dynamically in the Pipeline and then make some dynamic queries and/or store it in a database.

@arnaudfnr
Copy link
Contributor Author

R: @jkff @lukecwik
CC: @jbonofre @iemejia

@jbonofre
Copy link
Member

jbonofre commented Aug 4, 2017

R: @jbonofre

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed the first file to start. Please don't flatten your commits so that it is easier to follow the edits. Will review the next when I have a free moment.

* <br>This does not means relative error in the estimation <b>can't</b> be higher.
* <br>This only means that, on average, the relative error will be
* lower than the desired relative error.
* <br>Nevertheless, the more elements arrive in the stream, the lower the variation will be.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elements arrive in the stream -> elements in the PCollection

*
* <br><b>WARNING : </b>
* <br>This does not means relative error in the estimation <b>can't</b> be higher.
* <br>This only means that, on average, the relative error will be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that, on average, the -> that on average the

* </pre>
*
* <br><b>WARNING : </b>
* <br>This does not means relative error in the estimation <b>can't</b> be higher.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

means -> mean that the

* <p>According to the paper, the mean squared error is bounded by the following formula :
* <pre>b(m) / sqrt(m)
* Where m is the number of buckets used (p = log2(m) )
* and b(m) < 1.106 for m > 16 ( p > 4).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

( p -> (p

*
* <p>According to the paper, the mean squared error is bounded by the following formula :
* <pre>b(m) / sqrt(m)
* Where m is the number of buckets used (p = log2(m) )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

) ) -> ))

* the number of distinct values associated with each key in a {@code PCollection} of {@code KV}s.
*
* <p>This class uses the HyperLogLog algorithm, and more precisely
* the improved version of google (HyperLogLog+).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

google -> Google

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* in order to estimate the cardinality.
*
* @param p precision value for the normal representation
* @param <InputT> the type of the Input {@code Pcollection}'s elements being combined.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Input {@code Pcollection}'s -> input {@code Pcollection}'s

* <pre>{@code 1.1 / sqrt(2^p)}</pre>
* For instance, the estimation {@code ApproximateDistinct.globally(12)}
* will have a relative error of about 2%.
* <br> Also keep in mind that {@code p} cannot be lower than 4,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<br> Also -> <br>Also

/**
* A {@code PTransform} that takes an input {@code PCollection<KV<K, InputT>>} and returns a
* {@code PCollection<KV<K, HyperLogLogPlus>>} that contains an output element mapping each
* distinct key in the input {@code PCollection} to a structure wrapping an HyperLogLog which
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrapping an HyperLogLog -> wrapping a HyperLogLogPlus

}

/**
* Do the same as {@link ApproximateDistinct#globally(int)}, but with a default value for p.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with a default value for p -> with a default value of 18 for p

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest flattening all the package namespaces to just org.apache.beam.sdk.extensions.sketching

Having one package namespace will be fine until there are like 30+ different classes.

What do you think?

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix indentation/white space in all the files.

*
* <p>The Space-Saving algorithm summarizes the stream by using a doubly linked-list of buckets
* ordered by the frequency value they represent. Each of these buckets contains a linked-list
* of counters, which estimate the {@code count} for an element as well as the maximum
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop ,

* {@code PTransform}s for finding the k most frequent elements in a {@code PCollection}, or
* the k most frequent values associated with each key in a {@code PCollection} of {@code KV}s.
*
* <p>This class uses the Space-Saving algorithm, introduced in this paper :
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use <a> tags for links

}

@Override
public StreamSummary<InputT> createAccumulator() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix indentation

* <p>The {@code capacity} parameter controls the maximum number of elements the sketch
* can contain. Once this capacity is reached, the least frequent element is dropped each
* time an incoming element is not already present in the sketch.
* Each element in the sketch is associated to a counter, that keeps track of the estimate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop ,
estimate -> estimated


/**
* A {@code PTransform} that takes an input {@code PCollection<KV<K, InputT>>} and returns a
* {@code PCollection<KV<K, StreamSummary>>} that contains an output element mapping each
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PCollection<KV<K, StreamSummary>> -> PCollection<KV<K, StreamSummary<T>>>


@Override
public HyperLogLogPlus addInput(HyperLogLogPlus acc, InputT record) {
acc.offer(record);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HyperLogLogPlus can only handle a few types like long, string, byte[], ... when it performs the hashing and for all remaining types it uses toString. There is no guarantee that toString will be consistent for "equal" values.

We should use the coder to encode the input value to a byte[] and offer that to HyperLogLogPlus. We should also update the class comment stating that we need to ensure that the coder is deterministic and perform this validation by calling verify deterministic.

Please add a test for a custom object type which is not supported by MurmurHash (which is used within HyperLogLogPlus).

Copy link
Contributor Author

@arnaudfnr arnaudfnr Aug 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's a great idea !
I can't have access to the input coder from the combineFn, right ? so I thinkj we should create a method "withCoder" like in the Create transform in order to perform the encodings.

* <p>The {@code capacity} parameter controls the maximum number of elements the sketch
* can contain. Once this capacity is reached, the least frequent element is dropped each
* time an incoming element is not already present in the sketch.
* Each element in the sketch is associated to a counter, that keeps track of the estimate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drop ,
estimate -> estimated

* .apply(SketchFrequencies.<Integer, String>globally(10000));
* } </pre>
*
* @param capacity the maximum number off distinct elements that the Stream Summary can keep
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

off -> of

}

/**
* A {@code PTransform} that takes an input {@code PCollection<KV<K, InputT>>} and returns a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PCollection<KV<K, InputT>> -> PCollection<KV<K, T>>

* .apply(SketchFrequencies.<String>perKey(10000));
* } </pre>
*
* @param capacity the maximum number off distinct elements that the Stream Summary can keep
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

off -> of

@arnaudfnr
Copy link
Contributor Author

@lukecwik Wow thanks for the quick review Luke ! I am gonna work on this.

Copy link
Member

@lukecwik lukecwik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got through 3 out of the 4 main files with plenty of comments. I'm on vacation next week but can continue to review once I get back on August 14th.

import org.slf4j.LoggerFactory;

/**
* {@code PTransform}s that records an estimation of the frequency of each element in a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

{@code PTransform}s -> {@code PTransform}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are several PTransforms as we can go for globally() of perKey() (and most of the Combiners starts like this)

* {@code PCollection}, or the occurrences of values associated with each key in a
* {@code PCollection} of {@code KV}s.
*
* <p>This class uses the Count-min Sketch structure. The papers and other useful information
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use <a> href tags for links.

* <p>Example of use
* <pre> {@code PCollection<String> input = ...;
* PCollection<StreamSummary<String>> ssSketch = input
* .apply(SketchFrequencies.<String>perKey(10000));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the example be for KMostFrequent?

* <p>Example of use
* <pre> {@code PCollection<String> input = ...;
* PCollection<StreamSummary<String>> ssSketch = input
* .apply(SketchFrequencies.<String>perKey(10000));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example is using SketchFrequences instead of

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aléas of copy/paste

* <p>Example of use
* <pre> {@code PCollection<KV<Integer, String>> input = ...;
* PCollection<KV<Integer, StreamSummary<String>>> ssSketch = input
* .apply(SketchFrequencies.<Integer, String>globally(10000));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the example be for KMostFrequent?

* <br>The implementation comes from Apache Spark :
* https://github.com/apache/spark/tree/master/common/sketch
*/
class SketchFrequencies {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is package private, did you mean to make it that way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it should be public, don't know how I could have forgotten this, thanks.

* more than 0.02% in 99% of the cases.
*
*/
static class CountMinSketchFn
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could support all types instead of just strings by encoding each value to a byte[] and then converting that byte array to a base64 string or something equivalent.

You just want to make sure that the coder is deterministic.

@arnaudfnr
Copy link
Contributor Author

arnaudfnr commented Aug 4, 2017

@lukecwik Many thanks to have taken some of your time for reviewing my work before your holidays.
I am going to make a commit with the style fixes for all classes (even T-Digest). I hope I missed nothing.
For the type issues, I saw your remarks and will work on this next week.

For package flattening, I think you're right. The current organization comes from my work on DQ-Talend where there are more classes inside the packages so now this makes sense to flatten them.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 908abba on ArnaudFnr:sketching into ** on apache:master**.

@coveralls
Copy link

Coverage Status

Changes Unknown when pulling 908abba on ArnaudFnr:sketching into ** on apache:master**.

@jkff
Copy link
Contributor

jkff commented Aug 5, 2017

Hey - acknowledging that I am aware of this PR, but it'll be a few days before I get to it. Swamped with other things.

@arnaudfnr arnaudfnr force-pushed the sketching branch 2 times, most recently from 99ea3df to 49511d3 Compare August 14, 2017 08:06
@coveralls
Copy link

Coverage Status

Coverage increased (+0.01%) to 69.975% when pulling 50a69e4 on ArnaudFnr:sketching into b0b6421 on apache:master.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I started by reviewing only CountMinSketch because all my comments translate to other sketches too. I would actually suggest splitting these into different PRs - start with only CountMinSketch. There's a number of high-level API questions to resolve here, and it seems better to first resolve them once and for all using one example, before spending time changing the others.

Put briefly, I suggest the following reorganization:

  • Expose only CombineFn's
  • Write detailed javadoc only at class level - do not duplicate any nontrivial documentation

So the class would look something like this:

/**
 * Implements the CountMinSketch datastructure useful for ....
 * ... (talk about the data structure, give references etc) ...
 * Use one of the functions forRelativeErrorAndConfidence() or forDimensions() for to create a CombineFn that can be used in the following ways:
 * ...(talk about how to use it with Combine.globally or perKey, or with a state cell)...
 * Implementation details: ...(if you want, give more details that are less directly relevant to people who just want to estimate some frequencies)...
 */
class CountMinSketchFn<InputT> extends AbstractCombineFn<InputT, CountMinSketch, CountMinSketch>... {
  /** ... Configures the sketch to have the given error bounds: with probability ..., queries will be within ... of the true value ... */
  CountMinSketchFn forRelativeErrorAndConfidence(double fractionOfTotal, double confidence) { ... }

  /** Configures the sketch to have the explicitly specified depth and width. */
  CountMinSketchFn forDimensions(int width, int depth) { ... }
}

*/
package org.apache.beam.sdk.extensions.sketching.frequency;

import com.clearspring.analytics.stream.frequency.CountMinSketch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh neat, I implemented this one - small world :)

}

/**
* A {@code PTransform} that takes an input {@code PCollection<String>} and returns a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input type should be generic, I think. It's quite valid for this to be any encodable user type, e.g. it could be something like Long, or even another KV<Something, SomethingElse>.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I am going to work on genericity using coders

* {@code PCollection<CountMinSketch>} whose contents is a Count-min sketch that allows to query
* the number of hits for a specific element in the input {@code PCollection}.
*
* <p>The {@code seed} parameters will be used to randomly generate different hash functions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it benefit the user that they can provide a seed: can we use some default seed? Generally Beam aims to eliminate all tuning knobs that are not strictly necessary, and this one doesn't even influence correctness or performance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this is not strictly necessary. A default seed will indeed be the best thing in most use cases. But we could still let the possibility to tune this parameter, what do you think ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would stick with not adding it until there is a usecase that people need it for.

/**
* A {@code PTransform} that takes an input {@code PCollection<String>} and returns a
* {@code PCollection<CountMinSketch>} whose contents is a Count-min sketch that allows to query
* the number of hits for a specific element in the input {@code PCollection}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please say a few words about how to query the number of hits.

* The {@code seed} parameter will be used to generate a and b for each hash function.
* <br>The Count-min sketch size is constant through the process so the memory use is fixed.
* However, the dimensions are directly linked to the accuracy.
* <br>By default, the relative error is set to 1% with 1% probability that the estimation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this returns a Combine.Globally, it's not possible to change this on the result of this transform, so "by default" is somewhat misleading.

I think error bounds are something that the user should always configure explicitly and there does not exist a reasonable default value - it is entirely application-specific. And the current function signature does not allow configuring them.

There are a few ways out of this:

  1. Instead of returning a Combine.Globally, create a new wrapper transform for this, with builder methods for relevant parameters. It will be rather boilerplate-y.
  2. Just remove this function and expose only CountMinSketchFn, and have examples in javadoc about how to use it in a pipeline, mentioning various scenarios: Combine.Globally, Combine.PerKey, state cells etc. (not necessary to give code examples for each)
  3. Add more parameters to the current function (after removing seed :) )

I prefer 2 - in particular, because we can always add convenience wrappers later if it turns out that life is too hard without them - but I could be convinced otherwise.


@Override public CountMinSketch mergeAccumulators(Iterable<CountMinSketch> accumulators) {
Iterator<CountMinSketch> it = accumulators.iterator();
if (!it.hasNext()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can accumulators be empty?

return accumulator;
}

@Override public Coder<CountMinSketch> getAccumulatorCoder(CoderRegistry registry,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two functions for returning coders are unnecessary if instead you provide a default coder for CountMinSketch globally, per https://beam.apache.org/contribute/ptransform-style-guide/#providing-default-coders-for-types.

return CountMinSketch.deserialize(BYTE_ARRAY_CODER.decode(inStream));
}

@Override public boolean consistentWithEquals() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't it consistent with equals? I think CountMinSketch implements equals and hashCode deterministically.

} else {
// depth and width as computed in the CountMinSketch constructor from the relative error and
// confidence.
int width = (int) Math.ceil(2 / value.getRelativeError());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wonky, there's gotta be a better way to get access to depth and width. One easy way that comes to mind is simply send a PR to StreamLib to expose them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why this is wonky? I compute depth and width exactly the same way as in Count-Min Sketch constructors. I can still create a PR for that though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @jkff is pointing out that it would be best if we didn't duplicate the internal details of StreamLib and instead asked them to expose this functionality.

I wouldn't block merging this in and instead add a TODO to replace it by calling StreamLib's implementation directly when available and link to a JIRA or PR against StreamLib which exposes it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right I'm going to send a PR to stream-lib and add a Todo in this function then, thanks @lukecwik

* <a>https://github.com/tdunning/t-digest</a>
* However, this version has not been released yet so the issue is still up-to-date.
*/
public static class SerializableTDigest implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it need to implement Serializable?

@jkff
Copy link
Contributor

jkff commented Aug 14, 2017

Upon talking to @lukecwik I change my recommendation: let's still expose the transforms, but make sure that the factory functions for transforms take the configuration parameters. E.g.: CountMinSketches.{globally,perKey,combineFn}For{RelativeErrorAndConfidence,Dimensions}()

Might change forRelativeErrorAndConfidence to forErrorBounds for brevity.

For example, CountMinSketches.perKeyForErrorBounds(0.01 /* epsOfTotal */, 0.99 /* confidence */)

* @param sp the precision of HyperLogLog+' sparse representation
*/
public ApproximateDistinctFn<InputT> withSparseRepresentation(int sp) {
if (sp < p || sp > 32) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you mean p < sp || sp > 32?

* When the sparse representation would require more memory than the normal one,
* it is converted and the normal algorithm applies for the remaining elements.
*
* <p><b>WARNING : </b>Choose sp such that {@code p <= sp <= 32}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<b>WARNING : </b>Choose -> <b>WARNING:</b> Choose

* in order to estimate the cardinality.
*
* @param p precision value for the normal representation
* @param <InputT> the type of the input {@code Pcollection}'s elements being combined.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pcollection -> PCollection

* }</pre>
*
* @param <InputT> type of elements being combined
* @param p number of bits for indexes in the HyperLogLogPlus
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indexes -> indices

}

/**
* Do the same as {@link ApproximateDistinct#globally(int)},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the same as -> Same as


private static final Logger LOG = LoggerFactory.getLogger(ApproximateDistinct.class);

// do not instantiate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation

* By calling this builder, you will not use the sparse representation.
* If you want to, see {@link ApproximateDistinctFn#withSparseRepresentation(int)}
*
* <p>Example of use
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example of use -> Example:


/**
* Returns an {@code ApproximateDistinctFn} combiner with the given precision value p.
* This means that the input elements will be dispatched into 2^p buckets
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2^p -> {@code 2^p}


/**
* Do the same as {@link ApproximateDistinct#globally(int)},
* but with a default value of 18 for p.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

18 -> {@code 18}

}

/**
* Do the same as {@link ApproximateDistinct#globally(int)},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the same as -> Same as


@Override
public HyperLogLogPlus addInput(HyperLogLogPlus acc, InputT record) {
acc.offer(record);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArnaudFnr wrote:
Yes that's a great idea !
I can't have access to the input coder from the combineFn, right ? so I thinkj we should create a method "withCoder" like in the Create transform in order to perform the encodings.

You should add withCoder to the CombineFn.

You should also create a PTransform which sets the input coder from the input PCollection if its unset something like:

private static class ApproximateDistinctTransform<InputT> extends PTransform<PCollection<InputT>, PCollection<HyperLogLogPlus>> {
  private ApproximateDistinctTransform(p, sp, coder) {
    this.p = p;
    this.sp = sp;
    this.coder = coder;
  }
  @Override
  public PCollection<HyperLogLogPlus> expand(PCollection<InputT> input) {
    return input.apply(Combine.globally(new HyperLogLogPlus(p, sp, Objects.firstNonNull(coder, input.getCoder()))));
  }
}

You'll also want a keyed PTransform version for perKey as well.

*
* <p>This class uses the HyperLogLog algorithm, and more precisely
* the improved version of google (HyperLogLog+).
* <br>The implementation comes from Addthis' library Stream-lib : https://github.com/addthis/stream-lib
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArnaudFnr wrote:
Yes didn't think about that, thanks.

I would suggest inlining the links within the text like:

<br>The implementation comes from <a href="https://github.com/addthis/stream-lib">Addthis' Stream-lib library</a>.

For example, you could rephrase the section linking the papers as:
The <a href="http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf">original HyperLogLog paper</a> provides details the algorithm. The same authors released a <a href="http://cscubs.cs.uni-bonn.de/2016/proceedings/paper-03.pdf">paper</a> with a clearer view of the algorithm. Google released a <a href="https://research.google.com/pubs/pub40671.html">paper</a> containing a modified version named HyperLogLog+.

* the number of distinct values associated with each key in a {@code PCollection} of {@code KV}s.
*
* <p>This class uses the HyperLogLog algorithm, and more precisely
* the improved version of google (HyperLogLog+).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArnaudFnr wrote:
Done

Done.

}
}

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
nit: fix indentation of lines below

Done.

* @param sp the precision of HyperLogLog+' sparse representation
*/
public ApproximateDistinctFn<InputT> withSparseRepresentation(int sp) {
return new ApproximateDistinctFn<>(this.p, sp);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
Perform validation that p <= sp

Done.

*
* <p>According to the paper, the mean squared error is bounded by the following formula :
* <pre>b(m) / sqrt(m)
* Where m is the number of buckets used (p = log2(m) )
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
) ) -> ))

Done.

*
* <br><b>WARNING : </b>
* <br>This does not means relative error in the estimation <b>can't</b> be higher.
* <br>This only means that, on average, the relative error will be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
that, on average, the -> that on average the

Done.

* <br>This only means that, on average, the relative error will be
* lower than the desired relative error.
* <br>Nevertheless, the more elements arrive in the stream, the lower the variation will be.
* <br>Indeed, this is like when you throw a dice thousands or millions of time :
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArnaudFnr wrote:
I think you meant dice* but ok for millions, makes more sense.

I believe using die is correct since you are throwing a singular die millions of times.

Also, drop the preceding spaces, time : -> time:

* @param <InputT> the type of the Input {@code Pcollection}'s elements being combined.
*/
public static <InputT> ApproximateDistinctFn<InputT> create(int p) {
return new ApproximateDistinctFn<>(p, 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArnaudFnr wrote:
I thought about this too, but the validation is already done in the HyperLogLogPlus constructor.
Is there a specific interest to do it there ?

It is much clearer to the caller that they caused the error if we throw an exception immediately when validating the input parameters instead of when it is caused deeper within the stack.

try {
mergedAccum.addAll(accum);
} catch (CardinalityMergeException e) {
// Should never happen because only HyperLogLogPlus accumulators are instantiated.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArnaudFnr wrote:
Should I remove the Logger then ?

The thrown exception will contain all the details and the user will log it so no point in duplicating logging here also.

* <p>The {@code capacity} parameter controls the maximum number of elements the sketch
* can contain. Once this capacity is reached, the least frequent element is dropped each
* time an incoming element is not already present in the sketch.
* Each element in the sketch is associated to a counter, that keeps track of the estimate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
drop ,
estimate -> estimated

Done.

* <p>The {@code capacity} parameter controls the maximum number of elements the sketch
* can contain. Once this capacity is reached, the least frequent element is dropped each
* time an incoming element is not already present in the sketch.
* Each element in the sketch is associated to a counter, that keeps track of the estimate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
drop ,
estimate -> estimated

Done.


@Override
public StreamSummary<InputT> addInput(StreamSummary<InputT> accumulator, InputT element) {
accumulator.offer(element, 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
element may not implement a correct equals, for example byte[].

You'll want to create a wrapper types that implement Externalizable which stores a reference to the element, the element coder, and the structural value for the element. This wrapper type should delegate equals to the structural value and its write/read method should use the coder and element to encode/decode the wrapper with the structural value being ephemeral.

Please add a test for this case by counting 1000 new instances of byte[]{ 0x00 } or something similar.

Ping?

* .apply(SketchFrequencies.<String>perKey(10000));
* } </pre>
*
* @param capacity the maximum number off distinct elements that the Stream Summary can keep
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
off -> of

Done.

* <p>Example of use
* <pre> {@code PCollection<KV<Integer, String>> input = ...;
* PCollection<KV<Integer, StreamSummary<String>>> ssSketch = input
* .apply(SketchFrequencies.<Integer, String>globally(10000));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
Shouldn't the example be for KMostFrequent?

Done.

* <p>Example of use
* <pre> {@code PCollection<String> input = ...;
* PCollection<StreamSummary<String>> ssSketch = input
* .apply(SketchFrequencies.<String>perKey(10000));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
Shouldn't the example be for KMostFrequent?

Done.

* {@code PTransform}s for finding the k most frequent elements in a {@code PCollection}, or
* the k most frequent values associated with each key in a {@code PCollection} of {@code KV}s.
*
* <p>This class uses the Space-Saving algorithm, introduced in this paper :
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
Use <a> tags for links

Like before, attempt to inline the text with the link like
<a href="http://my/link">paper</a>


private static final Logger LOG = LoggerFactory.getLogger(KMostFrequent.class);

// do not instantiate
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
nit: fix comment indent

Done.


/**
* A {@code PTransform} that takes an input {@code PCollection<KV<K, InputT>>} and returns a
* {@code PCollection<KV<K, StreamSummary>>} that contains an output element mapping each
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
PCollection<KV<K, StreamSummary>> -> PCollection<KV<K, StreamSummary<T>>>

Done.

*
* @param <InputT> the type of the elements being combined
*/
static class KMostFrequentFn<InputT>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArnaudFnr wrote:
Ok let's go for this

Done.

* The {@code seed} parameter will be used to generate a and b for each hash function.
* <br>The Count-min sketch size is constant through the process so the memory use is fixed.
* However, the dimensions are directly linked to the accuracy.
* <br>By default, the relative error is set to 1% with 1% probability that the estimation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jkff wrote:
Since this returns a Combine.Globally, it's not possible to change this on the result of this transform, so "by default" is somewhat misleading.

I think error bounds are something that the user should always configure explicitly and there does not exist a reasonable default value - it is entirely application-specific. And the current function signature does not allow configuring them.

There are a few ways out of this:

  1. Instead of returning a Combine.Globally, create a new wrapper transform for this, with builder methods for relevant parameters. It will be rather boilerplate-y.
  2. Just remove this function and expose only CountMinSketchFn, and have examples in javadoc about how to use it in a pipeline, mentioning various scenarios: Combine.Globally, Combine.PerKey, state cells etc. (not necessary to give code examples for each)
  3. Add more parameters to the current function (after removing seed :) )

I prefer 2 - in particular, because we can always add convenience wrappers later if it turns out that life is too hard without them - but I could be convinced otherwise.

The style guide says that we should provide PTransforms first, and then CombineFns/DoFns second which is counter to suggestion #2.

I believe we should provide both a PTransform and the CombineFn in this case.

* <a>https://github.com/tdunning/t-digest</a>
* However, this version has not been released yet so the issue is still up-to-date.
*/
public static class SerializableTDigest implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jkff wrote:
Why does it need to implement Serializable?

Based on the comment, it seems as though version 3.2 was released recently and you should be able to simplify your code here:
https://mvnrepository.com/artifact/com.tdunning/t-digest/3.2

* {@code PCollection}, or the occurrences of values associated with each key in a
* {@code PCollection} of {@code KV}s.
*
* <p>This class uses the Count-min Sketch structure. The papers and other useful information
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lukecwik wrote:
Use <a> href tags for links.

Like before, inline the link within the text like:
<a href="http://my/link">some description</a>

import org.slf4j.LoggerFactory;

/**
* {@code PTransform}s that records an estimation of the frequency of each element in a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ArnaudFnr wrote:
There are several PTransforms as we can go for globally() of perKey() (and most of the Combiners starts like this)

in that case records -> record

@jkff
Copy link
Contributor

jkff commented Aug 17, 2017

It's getting difficult to follow all the comment threads. Please post an explicit update on this PR when you'd like me to take another look. I also stand by my suggestion to start with adding just one of these transforms - for the additional reason that it'll make it easier to deal with Github's shall we say questionable code review UI.

@reuvenlax
Copy link
Contributor

Any update on this PR?

@iemejia
Copy link
Member

iemejia commented Sep 7, 2017

Hi, I suppose @arnaudfnr is a bit busy with the return to school, let me ping him to confirm if he is going to finish this in the next days/weeks, if it is not the case, I will take from the current state and finish his work.

@arnaudfnr
Copy link
Contributor Author

arnaudfnr commented Sep 14, 2017

Hello, I was indeed quite busy the past month but now I am ready to work again regularly on this PR. I am going to submit only the ApproximateDistinct transform as Eugene proposed.

@jkff
Copy link
Contributor

jkff commented Dec 6, 2017

@arnaudfnr I think you were going to split this PR into multiple anyway, right? In that case, can the current PR be closed?

@jkff
Copy link
Contributor

jkff commented Feb 22, 2018

It seems that this PR was already disassembled into smaller PRs - closing.

@jkff jkff closed this Feb 22, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants